//
//  ========================================================================
//  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
 * <p>A {@link Connection} that handles the HTTP protocol.</p>
 */
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
{
    private static final Logger LOG = Log.getLogger(HttpConnection.class);
    public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
    public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
    private static final boolean REQUEST_BUFFER_DIRECT=false;
    private static final boolean HEADER_BUFFER_DIRECT=false;
    private static final boolean CHUNK_BUFFER_DIRECT=false;
    private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();

    private final HttpConfiguration _config;
    private final Connector _connector;
    private final ByteBufferPool _bufferPool;
    private final HttpInput _input;
    private final HttpGenerator _generator;
    private final HttpChannelOverHttp _channel;
    private final HttpParser _parser;
    private final AtomicInteger _contentBufferReferences=new AtomicInteger();
    private volatile ByteBuffer _requestBuffer = null;
    private volatile ByteBuffer _chunk = null;
    private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
    private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
    private final SendCallback _sendCallback = new SendCallback();

    /**
     * Get the current connection that this thread is dispatched to.
     * Note that a thread may be processing a request asynchronously and
     * thus not be dispatched to the connection.
     * @return the current HttpConnection or null
     * @see Request#getAttribute(String) for a more general way to access the HttpConnection
     */
    public static HttpConnection getCurrentConnection()
    {
        return __currentConnection.get();
    }

    protected static HttpConnection setCurrentConnection(HttpConnection connection)
    {
        HttpConnection last=__currentConnection.get();
        __currentConnection.set(connection);
        return last;
    }

    public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
    {
        super(endPoint, connector.getExecutor());
        _config = config;
        _connector = connector;
        _bufferPool = _connector.getByteBufferPool();
        _generator = newHttpGenerator();
        _channel = newHttpChannel();
        _input = _channel.getRequest().getHttpInput();
        _parser = newHttpParser();
        if (LOG.isDebugEnabled())
            LOG.debug("New HTTP Connection {}", this);
    }

    public HttpConfiguration getHttpConfiguration()
    {
        return _config;
    }

    protected HttpGenerator newHttpGenerator()
    {
        return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
    }

    protected HttpChannelOverHttp newHttpChannel()
    {
        return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
    }

    protected HttpParser newHttpParser()
    {
        return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
    }

    protected HttpParser.RequestHandler newRequestHandler()
    {
        return _channel;
    }

    public Server getServer()
    {
        return _connector.getServer();
    }

    public Connector getConnector()
    {
        return _connector;
    }

    public HttpChannel getHttpChannel()
    {
        return _channel;
    }

    public HttpParser getParser()
    {
        return _parser;
    }

    public HttpGenerator getGenerator()
    {
        return _generator;
    }

    @Override
    public boolean isOptimizedForDirectBuffers()
    {
        return getEndPoint().isOptimizedForDirectBuffers();
    }

    @Override
    public int getMessagesIn()
    {
        return getHttpChannel().getRequests();
    }

    @Override
    public int getMessagesOut()
    {
        return getHttpChannel().getRequests();
    }

    @Override
    public ByteBuffer onUpgradeFrom()
    {
        if (BufferUtil.hasContent(_requestBuffer))
        {
            ByteBuffer buffer = _requestBuffer;
            _requestBuffer=null;
            return buffer;
        }
        return null;
    }

    void releaseRequestBuffer()
    {
        if (_requestBuffer != null && !_requestBuffer.hasRemaining())
        {
            if (LOG.isDebugEnabled())
                LOG.debug("releaseRequestBuffer {}",this);
            ByteBuffer buffer=_requestBuffer;
            _requestBuffer=null;
            _bufferPool.release(buffer);
        }
    }

    public ByteBuffer getRequestBuffer()
    {
        if (_requestBuffer == null)
            _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
        return _requestBuffer;
    }

    public boolean isRequestBufferEmpty()
    {
        return BufferUtil.isEmpty(_requestBuffer);
    }

    @Override
    public void onFillable()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} onFillable enter {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));

        HttpConnection last=setCurrentConnection(this);
        try
        {
            while (true)
            {
                // Fill the request buffer (if needed)
                int filled = fillRequestBuffer();

                // Parse the request buffer
                boolean handle = parseRequestBuffer();
                // If there was a connection upgrade, the other
                // connection took over, nothing more to do here.
                if (getEndPoint().getConnection()!=this)
                    break;

                // Handle close parser
                if (_parser.isClose() || _parser.isClosed())
                {
                    close();
                    break;
                }

                // Handle channel event
                if (handle)
                {
                    boolean suspended = !_channel.handle();

                    // We should break iteration if we have suspended or changed connection or this is not the handling thread.
                    if (suspended || getEndPoint().getConnection() != this)
                        break;
                }

                // Continue or break?
                else if (filled<=0)
                {
                    if (filled==0)
                        fillInterested();
                    break;
                }
            }
        }
        finally
        {
            setCurrentConnection(last);
            if (LOG.isDebugEnabled())
                LOG.debug("{} onFillable exit {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
        }
    }

    /* ------------------------------------------------------------ */
    /** Fill and parse data looking for content
     * @return true if an {@link RequestHandler} method was called and it returned true;
     */
    protected boolean fillAndParseForContent()
    {
        boolean handled=false;
        while (_parser.inContentState())
        {
            int filled = fillRequestBuffer();
            boolean handle = parseRequestBuffer();
            handled|=handle;
            if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
                break;
        }
        return handled;
    }

    /* ------------------------------------------------------------ */
    private int fillRequestBuffer()
    {
        if (_contentBufferReferences.get()>0)
        {
            LOG.warn("{} fill with unconsumed content!",this);
            return 0;
        }

        if (BufferUtil.isEmpty(_requestBuffer))
        {
            // Can we fill?
            if(getEndPoint().isInputShutdown())
            {
                // No pretend we read -1
                _parser.atEOF();
                if (LOG.isDebugEnabled())
                    LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer));
                return -1;
            }

            // Get a buffer
            // We are not in a race here for the request buffer as we have not yet received a request,
            // so there are not an possible legal threads calling #parseContent or #completed.
            _requestBuffer = getRequestBuffer();

            // fill
            try
            {
                int filled = getEndPoint().fill(_requestBuffer);
                if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
                    filled = getEndPoint().fill(_requestBuffer);

                // tell parser
                if (filled < 0)
                    _parser.atEOF();

                if (LOG.isDebugEnabled())
                    LOG.debug("{} filled {} {}",this,filled,BufferUtil.toDetailString(_requestBuffer));

                return filled;
            }
            catch (IOException e)
            {
                LOG.debug(e);
                return -1;
            }
        }
        return 0;
    }

    /* ------------------------------------------------------------ */
    private boolean parseRequestBuffer()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));

        boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);

        if (LOG.isDebugEnabled())
            LOG.debug("{} parsed {} {}",this,handle,_parser);

        // recycle buffer ?
        if (_contentBufferReferences.get()==0)
            releaseRequestBuffer();

        return handle;
    }

    /* ------------------------------------------------------------ */
    @Override
    public void onCompleted()
    {
        // Handle connection upgrades
        if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
        {
            Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
            if (connection != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("Upgrade from {} to {}", this, connection);
                _channel.getState().upgrade();
                getEndPoint().upgrade(connection);
                _channel.recycle();
                _parser.reset();
                _generator.reset();
                if (_contentBufferReferences.get()==0)
                    releaseRequestBuffer();
                else
                {
                    LOG.warn("{} lingering content references?!?!",this);
                    _requestBuffer=null; // Not returned to pool!
                    _contentBufferReferences.set(0);
                }
                return;
            }
        }

        // Finish consuming the request
        // If we are still expecting
        if (_channel.isExpecting100Continue())
        {
            // close to seek EOF
            _parser.close();
        }
        else if (_parser.inContentState() && _generator.isPersistent())
        {
            // If we are async, then we have problems to complete neatly
            if (_channel.getRequest().getHttpInput().isAsync())
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("unconsumed async input {}", this);
                _channel.abort(new IOException("unconsumed input"));
            }
            else
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("unconsumed input {}", this);
                // Complete reading the request
                if (!_channel.getRequest().getHttpInput().consumeAll())
                    _channel.abort(new IOException("unconsumed input"));
            }
        }

        // Reset the channel, parsers and generator
        _channel.recycle();
        if (_generator.isPersistent() && !_parser.isClosed())
            _parser.reset();
        else
            _parser.close();

        // Not in a race here with onFillable, because it has given up control before calling handle.
        // in a slight race with #completed, but not sure what to do with that anyway.
        if (_chunk!=null)
            _bufferPool.release(_chunk);
        _chunk=null;
        _generator.reset();

        // if we are not called from the onfillable thread, schedule completion
        if (getCurrentConnection()!=this)
        {
            // If we are looking for the next request
            if (_parser.isStart())
            {
                // if the buffer is empty
                if (BufferUtil.isEmpty(_requestBuffer))
                {
                    // look for more data
                    fillInterested();
                }
                // else if we are still running
                else if (getConnector().isRunning())
                {
                    // Dispatched to handle a pipelined request
                    try
                    {
                        getExecutor().execute(this);
                    }
                    catch (RejectedExecutionException e)
                    {
                        if (getConnector().isRunning())
                            LOG.warn(e);
                        else
                            LOG.ignore(e);
                        getEndPoint().close();
                    }
                }
                else
                {
                    getEndPoint().close();
                }
            }
            // else the parser must be closed, so seek the EOF if we are still open
            else if (getEndPoint().isOpen())
                fillInterested();
        }
    }

    @Override
    protected void onFillInterestedFailed(Throwable cause)
    {
        _parser.close();
        super.onFillInterestedFailed(cause);
    }

    @Override
    public void onOpen()
    {
        super.onOpen();
        fillInterested();
    }

    @Override
    public void onClose()
    {
        _sendCallback.close();
        super.onClose();
    }

    @Override
    public void run()
    {
        onFillable();
    }

    @Override
    public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
    {
        if (info == null)
        {
            if (!lastContent && BufferUtil.isEmpty(content))
            {
                callback.succeeded();
                return;
            }
        }
        else
        {
            // If we are still expecting a 100 continues when we commit
            if (_channel.isExpecting100Continue())
                // then we can't be persistent
                _generator.setPersistent(false);
        }

        if(_sendCallback.reset(info,head,content,lastContent,callback))
            _sendCallback.iterate();
    }


    HttpInput.Content newContent(ByteBuffer c)
    {
        return new Content(c);
    }

    @Override
    public void abort(Throwable failure)
    {
        // Do a direct close of the output, as this may indicate to a client that the
        // response is bad either with RST or by abnormal completion of chunked response.
        getEndPoint().close();
    }

    @Override
    public boolean isPushSupported()
    {
        return false;
    }

    @Override
    public void push(org.eclipse.jetty.http.MetaData.Request request)
    {
        LOG.debug("ignore push in {}",this);
    }

    public void asyncReadFillInterested()
    {
        getEndPoint().fillInterested(_asyncReadCallback);
    }

    public void blockingReadFillInterested()
    {
        getEndPoint().fillInterested(_blockingReadCallback);
    }

    public void blockingReadException(Throwable e)
    {
        _blockingReadCallback.failed(e);
    }

    @Override
    public String toString()
    {
        return String.format("%s[p=%s,g=%s,c=%s]",
                super.toString(),
                _parser,
                _generator,
                _channel);
    }

    private class Content extends HttpInput.Content
    {
        public Content(ByteBuffer content)
        {
            super(content);
            _contentBufferReferences.incrementAndGet();
        }

        @Override
        public void succeeded()
        {
            if (_contentBufferReferences.decrementAndGet()==0)
                releaseRequestBuffer();
        }

        @Override
        public void failed(Throwable x)
        {
            succeeded();
        }
    }

    private class BlockingReadCallback implements Callback
    {
        @Override
        public void succeeded()
        {
            _input.unblock();
        }

        @Override
        public void failed(Throwable x)
        {
            _input.failed(x);
        }

        @Override
        public boolean isNonBlocking()
        {
            // This callback does not block, rather it wakes up the
            // thread that is blocked waiting on the read.
            return true;
        }
    }

    private class AsyncReadCallback implements Callback
    {
        @Override
        public void succeeded()
        {
            if (fillAndParseForContent())
                _channel.handle();
            else if (!_input.isFinished())
                asyncReadFillInterested();
        }

        @Override
        public void failed(Throwable x)
        {
            if (_input.failed(x))
                _channel.handle();
        }
    }

    private class SendCallback extends IteratingCallback
    {
        private MetaData.Response _info;
        private boolean _head;
        private ByteBuffer _content;
        private boolean _lastContent;
        private Callback _callback;
        private ByteBuffer _header;
        private boolean _shutdownOut;

        private SendCallback()
        {
            super(true);
        }

        @Override
        public boolean isNonBlocking()
        {
            return _callback.isNonBlocking();
        }

        private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
        {
            if (reset())
            {
                _info = info;
                _head = head;
                _content = content;
                _lastContent = last;
                _callback = callback;
                _header = null;
                _shutdownOut = false;
                return true;
            }

            if (isClosed())
                callback.failed(new EofException());
            else
                callback.failed(new WritePendingException());
            return false;
        }

        @Override
        public Action process() throws Exception
        {
            if (_callback==null)
                throw new IllegalStateException();

            ByteBuffer chunk = _chunk;
            while (true)
            {
                HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
                if (LOG.isDebugEnabled())
                    LOG.debug("{} generate: {} ({},{},{})@{}",
                        this,
                        result,
                        BufferUtil.toSummaryString(_header),
                        BufferUtil.toSummaryString(_content),
                        _lastContent,
                        _generator.getState());

                switch (result)
                {
                    case NEED_HEADER:
                    {
                        _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);

                        continue;
                    }
                    case NEED_CHUNK:
                    {
                        chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
                        continue;
                    }
                    case FLUSH:
                    {
                        // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
                        if (_head || _generator.isNoContent())
                        {
                            BufferUtil.clear(chunk);
                            BufferUtil.clear(_content);
                        }

                        // If we have a header
                        if (BufferUtil.hasContent(_header))
                        {
                            if (BufferUtil.hasContent(_content))
                            {
                                if (BufferUtil.hasContent(chunk))
                                    getEndPoint().write(this, _header, chunk, _content);
                                else
                                    getEndPoint().write(this, _header, _content);
                            }
                            else
                                getEndPoint().write(this, _header);
                        }
                        else if (BufferUtil.hasContent(chunk))
                        {
                            if (BufferUtil.hasContent(_content))
                                getEndPoint().write(this, chunk, _content);
                            else
                                getEndPoint().write(this, chunk);
                        }
                        else if (BufferUtil.hasContent(_content))
                        {
                            getEndPoint().write(this, _content);
                        }
                        else
                        {
                            succeeded(); // nothing to write
                        }
                        return Action.SCHEDULED;
                    }
                    case SHUTDOWN_OUT:
                    {
                        _shutdownOut=true;
                        continue;
                    }
                    case DONE:
                    {
                        return Action.SUCCEEDED;
                    }
                    case CONTINUE:
                    {
                        break;
                    }
                    default:
                    {
                        throw new IllegalStateException("generateResponse="+result);
                    }
                }
            }
        }

        private void releaseHeader()
        {
            ByteBuffer h=_header;
            _header=null;
            if (h!=null)
                _bufferPool.release(h);
        }

        @Override
        protected void onCompleteSuccess()
        {
            releaseHeader();
            _callback.succeeded();
            if (_shutdownOut)
                getEndPoint().shutdownOutput();
        }

        @Override
        public void onCompleteFailure(final Throwable x)
        {
            releaseHeader();
            failedCallback(_callback,x);
            if (_shutdownOut)
                getEndPoint().shutdownOutput();
        }

        @Override
        public String toString()
        {
            return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
        }
    }
}
